Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate messages in UCX #3732

Open
wants to merge 39 commits into
base: main
Choose a base branch
from
Open

Conversation

jakirkham
Copy link
Member

@jakirkham jakirkham commented Apr 21, 2020

Requires PR ( #3731 )

To cutdown on the number of messages and increase the size of messages, this packs messages into as few frames as possible. The messages are as follows:

  1. # of frames that will be transmitted
  2. Metadata about all frames (whether on device, what size) Included in PR ( Relax NumPy requirement in UCX #3731 )
  3. All host frames concatenated
  4. All device frames concatenated

1 is the same as before. Not much that can be done here as we need this to gauge how much space we need to allocate for the follow messages.

2 combines separate messages into a single message. This works nicely thanks to struct.packs and struct.unpacks ability to work with heterogeneous types easily. In other words this benefits from the work already done in PR ( #3731 ).

For 3 and 4, these previously used separate messages for each frame. Now we allocate one buffer to hold all host frames and one to hold all device frames. On the send side, we pack all the frames together into one of these buffers and send them over (as long as they are not empty). On the receive side, we allocate space for and then receive the host buffer and device buffer. Afterwards we unpack these into the original frames.

The benefit of this change is that we send at most 4 messages (less if there are no host or device frames) and we send the largest possible amount of data we can at one time. As a result we should get more out of each transmission.

However the tradeoff is we need to allocate space to pack the frames and we need to allocate space to unpack the frames. So we use roughly 2x the memory that we had previously. There has been discussion in issue ( https://github.com/rapidsai/rmm/issues/318 ) and issue ( rapidsai/cudf#3793 ) about having an implementation this does not require the additional memory, but no such implementation exists today.

FWIW I've run the test suite on this PR successfully. Though please feel free to play as well. 🙂

cc @quasiben @madsbk @pentschev @rjzamora @kkraus14 @jrhemstad @randerzander

if each_size:
if is_cuda:
each_frame_view = as_numba_device_array(each_frame)
device_frames_view[:each_size] = each_frame_view[:]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know what this translates to in Numba under the hood? We could likely implement this __setitem__ in RMM on DeviceBuffer if desired which typically reduces the overhead quite a bit versus Numba.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't.

We probably could. Would rather wait until we have run this on some real workloads so we can assess and plan next steps.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah after poking at this a bit. I think you are right. It's worth adding __setitem__ to DeviceBuffer.

Reused the MRE from issue ( rapidsai/ucx-py#402 ) as a quick test. The bar selected is for the __setitem__ calls in send. There is another one for recv. Should add as_cuda_array doesn't appear cheap either. Avoiding these seems highly desirable.

Screen Shot 2020-04-20 at 9 40 44 PM

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should add we would need __getitem__ as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a rough draft of this in PR ( rapidsai/rmm#351 ).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the logic here a bit so that CuPy will be used if available, but will fallback to Numba if not. Am not seeing the same issue with CuPy.

@pentschev
Copy link
Member

I must say as much as I like the idea of packing things together, I'm a bit concerned with doubling the memory footprint. As we have already many memory pressing cases, I think it's unwise to make this a hard feature. I'm definitely +1 for packing the metadata together, but we should be careful with packing the data. One thing I was thinking we could do is perhaps introduce a configuration option to enable/disable packing of data frames to reduce memory footprint. Any thoughts on this?

@jakirkham
Copy link
Member Author

Just to be clear it is doubling the memory usage per object being transmitted during its transmission. So it is not as simple as doubling all memory or for the full length of the program. It might be that this doesn't matter that much. Ultimately we need to see some testing on typical workflows to know for sure. 🙂

That said, I don't mind a config option if we find it is problematic, but maybe let's confirm it is before going down that road 😉

@pentschev
Copy link
Member

Just to be clear it is doubling the memory usage per object being transmitted during its transmission. So it is not as simple as doubling all memory or for the full length of the program. It might be that this doesn't matter that much. Ultimately we need to see some testing on typical workflows to know for sure. 🙂

I understand that, it is nevertheless necessary to double potentially large chunks, which is undesirable. We also have to take into account the overhead of making a copy of such buffers which may remove in part the advantage of packing them together.

That said, I don't mind a config option if we find it is problematic, but maybe let's confirm it is before going down that road 😉

I would then rather confirm that this PR is not introducing undesirable effects first rather than working around that later.

@jakirkham
Copy link
Member Author

I think we are on the same page. I'd like people to try it and report feedback before we consider merging.

@pentschev
Copy link
Member

I think we are on the same page. I'd like people to try it and report feedback before we consider merging.

Awesome, please keep us posted. Let me know if I can assist on the testing.

@jakirkham
Copy link
Member Author

Help testing it would be welcome 🙂

Thus far have tried the MRE from issue ( rapidsai/ucx-py#402 ) where it seems to help.

@pentschev
Copy link
Member

Thus far have tried the MRE from issue ( rapidsai/ucx-py#402 ) where it seems to help.

Could you elaborate on what you mean by "seems to help"?

Another question: have you happened to confirm whether transfers happen over NVLink?

@pentschev
Copy link
Member

My tests show an improvement of this PR versus the current master branch, so definitely +1 from that perspective.

I'm not able to evaluate memory footprint right now, but I'm hoping @beckernick or @VibhuJawa could test this in a workflow that's very demanding of memory, hopefully one where they know to be just at the boundary of memory utilization. I think this may give us a clear picture of real impact of this PR.

@jakirkham
Copy link
Member Author

Thanks for testing this as well. Sorry for the slow reply (dropped off for the evening).

Could you elaborate on what you mean by "seems to help"?

I ran the workflow and looked at the various Dask dashboards. When trying this PR, 2.14.0, and master (to ensure other changes in 2.14.0 weren't also affecting it). No real difference between 2.14.0 and master. This seemed to run faster and have higher bandwidth between workers.

That said, I hardly think that is enough testing or even the right diagnostics. So that's why I left it at "seems to help" 🙂

Another question: have you happened to confirm whether transfers happen over NVLink?

I hadn't confirmed this yet. Though NVLink was enabled when I ran in all cases before. Of course that isn't confirmation that it works 😉


Agree it would be great if Nick and Vibhu could try. Would be interested in seeing how this performs in more workloads 😄

@pentschev
Copy link
Member

I hadn't confirmed this yet. Though NVLink was enabled when I ran in all cases before. Of course that isn't confirmation that it works 😉

I forgot to mention in my previous comment that I confirmed to see NVLink traffic, so that is fine.

Apart from that, I would only like to see some testing of larger workflows, if that presents no issues then we should be good. Hopefully Vibhu or Nick will have a chance to test it the next few days or so.

@pentschev
Copy link
Member

And of course, thanks for the nice work @jakirkham !

Copy link
Member

@kkraus14 kkraus14 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely need to short circuit the packing if we only have 1 device frame or 1 host frame, and then I think there's some opportunities to fuse loops together to be more efficient.

distributed/comm/ucx.py Outdated Show resolved Hide resolved
distributed/comm/ucx.py Outdated Show resolved Hide resolved
distributed/comm/ucx.py Outdated Show resolved Hide resolved
distributed/comm/ucx.py Outdated Show resolved Hide resolved
Provides a function to let us coerce our underlying
`__cuda_array_interface__` objects into something that behaves more like
an array. Prefers CuPy if possible, but will fallback to Numba if its
not available.
To cutdown on the number of send/recv operations and also to transmit
larger amounts of data at a time, this condenses all frames into a host
buffer and a device buffer, which are sent as two separate
transmissions.
No need to concatenate them together in this case.
To optimize concatenation in the case where NumPy and CuPy are around,
just use their `concatenate` functions. However when they are absent
fallback to some hand-rolled concatenate routines.
distributed/comm/ucx.py Outdated Show resolved Hide resolved
To optimize the case where NumPy and CuPy are around, simply use their
`split` function to pull apart large frames into smaller chunks.
@pentschev
Copy link
Member

I'm now seeing the following errors just as workers connect to the scheduler. Errors on scheduler:

ucp.exceptions.UCXMsgTruncated: Comm Error "[Recv #002] ep: 0x7fac27641380, tag: 0xf2597f095b80a8c, nbytes: 1179, type: <class 'numpy.ndarray'>": length mismatch: 2358 (got) != 1179 (expected)
distributed.utils - ERROR - Comm Error "[Recv #002] ep: 0x7fac27641460, tag: 0x1a3986bf8dbfcb58, nbytes: 31, type: <class 'numpy.ndarray'>": length mismatch: 62 (got) != 31 (expected)
Traceback (most recent call last):
  File "/datasets/pentschev/miniconda3/envs/ucx-src-102-0.14.0b200424/lib/python3.7/site-packages/distributed/utils.py", line 665, in log_errors
    yield
  File "/datasets/pentschev/miniconda3/envs/ucx-src-102-0.14.0b200424/lib/python3.7/site-packages/distributed/comm/ucx.py", line 359, in read
    await self.ep.recv(host_frames)
  File "/datasets/pentschev/miniconda3/envs/ucx-src-102-0.14.0b200424/lib/python3.7/site-packages/ucp/core.py", line 538, in recv
    ret = await ucx_api.tag_recv(self._ep, buffer, nbytes, tag, log_msg=log)
ucp.exceptions.UCXMsgTruncated: Comm Error "[Recv #002] ep: 0x7fac27641460, tag: 0x1a3986bf8dbfcb58, nbytes: 31, type: <class 'numpy.ndarray'>": length mismatch: 62 (got) != 31 (expected)
distributed.core - ERROR - Comm Error "[Recv #002] ep: 0x7fac27641460, tag: 0x1a3986bf8dbfcb58, nbytes: 31, type: <class 'numpy.ndarray'>": length mismatch: 62 (got) != 31 (expected)
Traceback (most recent call last):
  File "/datasets/pentschev/miniconda3/envs/ucx-src-102-0.14.0b200424/lib/python3.7/site-packages/distributed/core.py", line 337, in handle_comm
    msg = await comm.read()
  File "/datasets/pentschev/miniconda3/envs/ucx-src-102-0.14.0b200424/lib/python3.7/site-packages/distributed/comm/ucx.py", line 359, in read
    await self.ep.recv(host_frames)
  File "/datasets/pentschev/miniconda3/envs/ucx-src-102-0.14.0b200424/lib/python3.7/site-packages/ucp/core.py", line 538, in recv
    ret = await ucx_api.tag_recv(self._ep, buffer, nbytes, tag, log_msg=log)

Errors on worker:

Traceback (most recent call last):
  File "/datasets/pentschev/miniconda3/envs/ucx-src-102-0.14.0b200424/lib/python3.7/site-packages/distributed/nanny.py", line 737, in run
    await worker
  File "/datasets/pentschev/miniconda3/envs/ucx-src-102-0.14.0b200424/lib/python3.7/site-packages/distributed/worker.py", line 1060, in start
    await self._register_with_scheduler()
  File "/datasets/pentschev/miniconda3/envs/ucx-src-102-0.14.0b200424/lib/python3.7/site-packages/distributed/worker.py", line 844, in _register_with_scheduler
    response = await future
  File "/datasets/pentschev/miniconda3/envs/ucx-src-102-0.14.0b200424/lib/python3.7/site-packages/distributed/comm/ucx.py", line 359, in read
    await self.ep.recv(host_frames)
  File "/datasets/pentschev/miniconda3/envs/ucx-src-102-0.14.0b200424/lib/python3.7/site-packages/ucp/core.py", line 538, in recv
    ret = await ucx_api.tag_recv(self._ep, buffer, nbytes, tag, log_msg=log)
ucp.exceptions.UCXError: Error receiving "[Recv #002] ep: 0x7f81f69d8070, tag: 0x1da2edb6ba7bb4fb, nbytes: 2102, type: <class 'numpy.ndarray'>": Message truncated
distributed.utils - ERROR - Error receiving "[Recv #002] ep: 0x7fc7a7595070, tag: 0x112d16f17d226012, nbytes: 2092, type: <class 'numpy.ndarray'>": Message truncated
Traceback (most recent call last):
  File "/datasets/pentschev/miniconda3/envs/ucx-src-102-0.14.0b200424/lib/python3.7/site-packages/distributed/utils.py", line 665, in log_errors
    yield
  File "/datasets/pentschev/miniconda3/envs/ucx-src-102-0.14.0b200424/lib/python3.7/site-packages/distributed/comm/ucx.py", line 359, in read
    await self.ep.recv(host_frames)
  File "/datasets/pentschev/miniconda3/envs/ucx-src-102-0.14.0b200424/lib/python3.7/site-packages/ucp/core.py", line 538, in recv
    ret = await ucx_api.tag_recv(self._ep, buffer, nbytes, tag, log_msg=log)
ucp.exceptions.UCXError: Error receiving "[Recv #002] ep: 0x7fc7a7595070, tag: 0x112d16f17d226012, nbytes: 2092, type: <class 'numpy.ndarray'>": Message truncated

I've tested this PR a couple of days ago and it used to work fine, so I think some of the latest changes caused this.

@jakirkham
Copy link
Member Author

I would not try to use this atm. Some of the new commits have bugs.

@jakirkham
Copy link
Member Author

This should be back to a state that you can play with @pentschev. Was able to run the workflow in issue ( rapidsai/ucx-py#402 ) as a test case.

@pentschev
Copy link
Member

pentschev commented Apr 28, 2020

@jakirkham performance-wise, I'd say this is a good improvement. I did some runs with 4 DGX-1 nodes using the code from rapidsai/ucx-py#402 (comment), please see details below:

IB
Create time: 31.083641052246094
Merge time: 62.41462779045105
Create time: 30.35321569442749
Merge time: 66.1707193851471
Create time: 29.627256631851196
Merge time: 67.02337288856506

IB - Distributed Pack Frames
Create time: 23.928910970687866
Merge time: 52.785187005996704
Create time: 23.919641256332397
Merge time: 52.29404807090759
Create time: 23.251129627227783
Merge time: 53.747430086135864


IB+NV
Create time: 30.51845955848694
Merge time: 63.51215887069702
Create time: 30.213918209075928
Merge time: 69.68081068992615
Create time: 30.7943594455719
Merge time: 71.24144387245178

IB+NV - Distributed Pack Frames
Create time: 23.833109378814697
Merge time: 54.470152378082275
Create time: 24.106595516204834
Merge time: 53.03110957145691
Create time: 23.372838735580444
Merge time: 54.14150428771973


NV
Create time: 27.96121573448181
Merge time: 68.33352589607239
Create time: 28.454411029815674
Merge time: 69.2346978187561
Create time: 28.770456552505493
Merge time: 68.84859800338745

NV - Distributed Pack Frames
Create time: 25.76459288597107
Merge time: 110.81587362289429
Create time: 26.054383516311646
Merge time: 58.413026094436646
Create time: 27.28168559074402
Merge time: 58.45553708076477


TCP over UCX
Create time: 28.669108390808105
Merge time: 67.4001305103302
Create time: 29.40789222717285
Merge time: 70.51220607757568
Create time: 30.285409212112427
Merge time: 70.55179977416992

TCP over UCX - Distributed Pack Frames
Create time: 25.65769076347351
Merge time: 110.63008999824524
Create time: 25.772521495819092
Merge time: 57.16732382774353
Create time: 26.856338262557983
Merge time: 58.4323296546936


Python Sockets (TCP)
Create time: 26.35135531425476
Merge time: 53.146289348602295
Create time: 26.18475842475891
Merge time: 56.93609070777893
Create time: 26.424189805984497
Merge time: 56.36023283004761

In our call offline I had already mentioned it, I know at some point we were close to the performance we see with this PR now and there were probably regressions along the way. One other thing that I think may have been the cause was the introduction of CUDA synchronization in Dask, which is necessary but perhaps it wasn't there when we achieved good performance.

TL;DR: for this particular test, IB is now slightly faster than using Python sockets for communication. This one sample has been very difficult to get faster than just Python sockets and I believe there are other workflows that will achieve better performance, as we've seen in the past already.

@jakirkham
Copy link
Member Author

Thanks Peter! Can you please share a bit about where this was run?

@pentschev
Copy link
Member

Thanks Peter! Can you please share a bit about where this was run?

This was run on a small cluster of 4 DGX-1 nodes, I updated my post above to reflect that.

@jakirkham
Copy link
Member Author

Added some logic in the most recent commits to filter out empty frames from the concatenation process and avoid allocating empty frames unless they are used by the serialized object. This puts many more common cases on the fast path. Also fixes the segfaulting test problem seen before. So have restored that test.

As `.copy()` calls `memcpy`, which is synchronous, performance is worse
as we synchronize after copying each part of the buffer. To fix this,
we switch to `cupy.copyto` with calls `memcpyasync`. This lets us avoid
having a synchronize after each copy.
Copy link
Member Author

@jakirkham jakirkham left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noticed that device_split was synchronizing a lot when copying data. Have made some more changes explained below, which cut this down to one synchronize step. Should dramatically improve performance during unpacking (so on recv).

result_buffers = []
for e in ary_split:
e2 = cupy.empty_like(e)
cupy.copyto(e2, e)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally was calling e.copy() however this calls cudaMemcpy, which is synchronous under-the-hood. So have switched to allocating arrays and calling copyto, which uses cudaMemcpyAsync. Should cutdown on the overhead of copying data into smaller buffers during unpacking.

cupy.copyto(e2, e)
results.append(e2)
result_buffers.append(e2.data.mem._owner)
cupy.cuda.stream.get_current_stream().synchronize()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we synchronize once to ensure all of the copies did complete. We do this to make sure the data is valid before the DeviceBuffer it came from is freed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the DeviceBuffer goes out of scope I believe the actual freeing of memory is enqueued onto the stream, so you should be fine due to stream ordering. @harrism @jrhemstad is that correct from the perspective of RMM?

If this is just a normal cuda memory allocation that will call cudaFree then that also gets enqueued onto the stream so you should be safe in general without this synchronize I believe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool I went ahead and dropped it. If we need it back, it is easy to revert.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to make sure that the buffer's memory was never used on a different stream than the one it will be deleted on without synchronizing the streams before deleting.

[Answer to Keith's question: It's not strictly to correct to say the freeing of memory is enqueued onto the stream. Just that an allocation on the same stream will be allowed to use the freed block in stream order (e.g. safely). An allocation on a different stream will only use that block after the streams are synchronized. ]

Base automatically changed from master to main March 8, 2021 19:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants